package cn.doitedu.rtdw.data_etl;

import ch.hsr.geohash.GeoHash;
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import java.util.Map;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/02/05
 * @Tips: 学大数据，到多易教育
 * @Desc:   行为日志数据 公共维度退化
 **/
public class E01_EtlJob_MallUserEventCommonDim {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(5*1000);
        //env.getCheckpointConfig().setCheckpointStorage("hdfs://doitedu:8020/etljob-checkpoint/job1/");
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");

        // 能用 flinksql，尽量用 flinksql
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建日志源 kafka 连接器表
        // 创建 kafka 连接器表： 维度打宽后的事件信息  source表
        // 创建kafka连接器数据源表： 用户行为事件
        tableEnv.executeSql(
                " CREATE TABLE mall_events_kafkasource (               "
                        // 物理字段
                        + "     username     string,                            "
                        + "     session_id   string,                            "
                        + "     eventId      string,                            "
                        + "     eventTime    bigint,                            "
                        + "     lat          double,                            "
                        + "     lng          double,                            "
                        + "     release_channel   string,                       "
                        + "     device_type       string,                       "
                        + "     properties   map<string,string>,                "
                        // 表达式字段,声明了process time语义的时间戳字段，用于后续的lookup join
                        + "     proc_time   AS PROCTIME()                       "
                        + " ) WITH (                                            "
                        + "  'connector' = 'kafka',                             "
                        + "  'topic' = 'mall-events-log',                       "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',   "
                        + "  'properties.group.id' = 'goo1',                    "
                        + "  'scan.startup.mode' = 'latest-offset',             "
                        + "  'value.format'='json',                             "
                        + "  'value.json.fail-on-missing-field'='false',        "
                        + "  'value.fields-include' = 'EXCEPT_KEY'              "
                        + " )                                                   ");

        // 创建hbase连接器表： 用户注册信息维表
        tableEnv.executeSql(
                "CREATE TABLE ums_member_hbasesource ( " +
                        " username STRING,                              " +
                        " f ROW<id INT,phone STRING, status INT, create_time TIMESTAMP(3), gender INT, birthday DATE, province STRING, city STRING, job STRING, source_type INT>, " +
                        " PRIMARY KEY (username) NOT ENFORCED           " +
                        ") WITH (                                       " +
                        " 'connector' = 'hbase-2.2',                    " +
                        " 'table-name' = 'dim_user_info',               " +
                        " 'zookeeper.quorum' = 'doitedu:2181'           " +
                        ")");


        // 创建hbase连接器表： geohash码地域维表
        tableEnv.executeSql(
                "CREATE TABLE dim_geo_area_hbasesource ( " +
                        " geohash STRING,                                      " +
                        " f ROW<p STRING, c STRING, r STRING>, " +
                        " PRIMARY KEY (geohash) NOT ENFORCED            " +
                        ") WITH (                                       " +
                        " 'connector' = 'hbase-2.2',                    " +
                        " 'table-name' = 'dim_geo_area',                " +
                        " 'zookeeper.quorum' = 'doitedu:2181'           " +
                        ")");


        // 创建hbase连接器表： 页面信息维表
        tableEnv.executeSql(
                "CREATE TABLE dim_page_info_hbasesource ( " +
                        " url STRING,                                 " +
                        " f ROW<pt STRING, sv STRING>,    " +
                        " PRIMARY KEY (url) NOT ENFORCED              " +
                        ") WITH (                                     " +
                        " 'connector' = 'hbase-2.2',                  " +
                        " 'table-name' = 'dim_page_info',             " +
                        " 'zookeeper.quorum' = 'doitedu:2181'         " +
                        ")");


        // 创建 输出到 kafka的明细数据topic的  逻辑映射表
        tableEnv.executeSql(
                " CREATE TABLE mall_events_commondim_kafkasink(          "
                        + "     user_id           INT,                            "
                        + "     username          string,                         "
                        + "     session_id        string,                         "
                        + "     event_Id          string,                         "
                        + "     event_time        bigint,                         "
                        + "     lat               double,                         "
                        + "     lng               double,                         "
                        + "     release_channel   string,                         "
                        + "     device_type       string,                         "
                        + "     properties        map<string,string>,             "
                        + "     register_phone    STRING,                         "
                        + "     user_status       INT,                            "
                        + "     register_time     TIMESTAMP(3),                   "
                        + "     register_gender   INT,                            "
                        + "     register_birthday DATE, register_province STRING, "
                        + "     register_city STRING, register_job STRING, register_source_type INT,   "
                        + "     gps_province   STRING, gps_city STRING, gps_region STRING,             "
                        + "     page_type   STRING, page_service STRING         "
                        + " ) WITH (                                            "
                        + "  'connector' = 'kafka',                             "
                        + "  'topic' = 'mall-events-wide',                      "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',   "
                        + "  'properties.group.id' = 'testGroup',               "
                        + "  'scan.startup.mode' = 'earliest-offset',           "
                        + "  'value.format'='json',                             "
                        + "  'value.json.fail-on-missing-field'='false',        "
                        + "  'value.fields-include' = 'EXCEPT_KEY')             ");

        // 创建doris连接器表： 维度打宽后的目标sink表
        tableEnv.executeSql(
                " CREATE TABLE mall_events_commondim_dorissink(         "
                        +"     gps_province         VARCHAR(16),   "
                        +"     gps_city             VARCHAR(16),   "
                        +"     gps_region           VARCHAR(16),   "
                        +"     dt                   DATE,          "
                        +"     user_id              INT,           "
                        +"     username             VARCHAR(20),   "
                        +"     session_id           VARCHAR(20),   "
                        +"     event_id             VARCHAR(10),   "
                        +"     event_time           bigint,        "
                        +"     lat                  DOUBLE,        "
                        +"     lng                  DOUBLE,        "
                        +"     release_channel      VARCHAR(20),   "
                        +"     device_type          VARCHAR(20),   "
                        +"     properties           VARCHAR(40),   "
                        +"     register_phone       VARCHAR(20),   "
                        +"     user_status          INT,           "
                        +"     register_time        TIMESTAMP(3),  "
                        +"     register_gender      INT,           "
                        +"     register_birthday    DATE,          "
                        +"     register_province    VARCHAR(20),   "
                        +"     register_city        VARCHAR(20),   "
                        +"     register_job         VARCHAR(20),   "
                        +"     register_source_type INT        ,   "
                        +"     page_type            VARCHAR(20),   "
                        +"     page_service         VARCHAR(20)    "
                        + " ) WITH (                               "
                        + "    'connector' = 'doris',              "
                        + "    'fenodes' = 'doitedu:8030',         "
                        + "    'table.identifier' = 'dwd.mall_events_wide',  "
                        + "    'username' = 'root',                "
                        + "    'password' = '',                    "
                        + "    'sink.label-prefix' = 'doris_label"+System.currentTimeMillis()+"'"
                        + " )                                         ");


        // 对原始日志数据逻辑映射表，关联 各种维表的 逻辑映射表
        // 将关联好的结果，注册成一个flinksql内部的  临时视图
        // 三表关联，并将
        tableEnv.createTemporaryFunction("geo",GeoHashFunction.class);
        tableEnv.executeSql(
                "CREATE TEMPORARY VIEW wide_view AS  SELECT  " +
                        "u.id as user_id,e.username,e.session_id,e.eventId as event_id,e.eventTime as event_time,e.lat,e.lng," +
                        "e.release_channel,e.device_type,e.properties," +
                        "u.f.phone as register_phone,u.f.status as user_status,u.f.create_time as register_time," +
                        "u.f.gender as register_gender,u.f.birthday as register_birthday,u.f.province as register_province,u.f.city as register_city," +
                        "u.f.job as register_job,u.f.source_type as register_source_type," +
                        "g.f.p as gps_province ,g.f.c as gps_city,g.f.r as gps_region, " +
                        "p.f.pt as page_type, p.f.sv as page_service " +
                        "FROM mall_events_kafkasource AS e  " +
                        "LEFT JOIN ums_member_hbasesource FOR SYSTEM_TIME AS OF e.proc_time AS u  ON e.username = u.username " +
                        // geohash，反转关联hbase中的rowkey（因为hbase中就是反转存储的，避免热点问题）
                        "LEFT JOIN dim_geo_area_hbasesource FOR SYSTEM_TIME AS OF e.proc_time AS g ON REVERSE(geo(e.lat,e.lng)) = g.geohash   "+
                        "LEFT JOIN dim_page_info_hbasesource FOR SYSTEM_TIME AS OF e.proc_time AS p ON regexp_extract(e.properties['url'],'(^.*/).*?') = p.url "
        );


        // 从关联结果视图中，select数据，插入 两个sink逻辑映射表
        // 打宽后的结果写入kafka
        tableEnv.executeSql("INSERT INTO mall_events_commondim_kafkasink  SELECT * from  wide_view");

        // 打宽后的结果写入doris
        tableEnv.createTemporaryFunction("toJson",HashMap2Json.class);
        tableEnv.executeSql("INSERT INTO mall_events_commondim_dorissink                                      "
                +" SELECT                                                                         "
                +"     gps_province         ,                                                     "
                +"     gps_city             ,                                                     "
                +"     gps_region           ,                                                     "
                +"     TO_DATE(DATE_FORMAT(TO_TIMESTAMP_LTZ(event_time, 3),'yyyy-MM-dd')) as dt,  "
                +"     user_id              ,                                                     "
                +"     username             ,                                                     "
                +"     session_id           ,                                                     "
                +"     event_id             ,                                                     "
                +"     event_time           ,                                                     "
                +"     lat                  ,                                                     "
                +"     lng                  ,                                                     "
                +"     release_channel      ,                                                     "
                +"     device_type          ,                                                     "
                +"     toJson(properties) as properties     ,                                     "
                +"     register_phone       ,                                                     "
                +"     user_status          ,                                                     "
                +"     register_time        ,                                                     "
                +"     register_gender      ,                                                     "
                +"     register_birthday    ,                                                     "
                +"     register_province    ,                                                     "
                +"     register_city        ,                                                     "
                +"     register_job         ,                                                     "
                +"     register_source_type ,                                                     "
                +"     page_type            ,                                                     "
                +"     page_service                                                               "
                +" FROM   wide_view                                                               "
        );


    }

    public static class GeoHashFunction extends ScalarFunction{
        public String eval(Double lat,Double lng){
            try {
                return GeoHash.geoHashStringWithCharacterPrecision(lat, lng, 5);
            }catch(Exception exception){

            }
            return "";
        }
    }



    public static class HashMap2Json extends ScalarFunction{
        public String eval(Map<String,String> properties){
            return JSON.toJSONString(properties);
        }

    }

}
